El objetivo de esta práctica es proporcionar una introducción al procesamiento de señales temporales como es el caso de la señal de voz, y desarrollar de un detector de actividad de voz basado en redes neuronales recurrentes.
CUIDADO: * Los datos proporcionados son de uso exclusivo para esta práctica. No tiene permiso para copiar, distribuir o utilizar el corpus para ningún otro propósito.
from google.colab import files
uploaded = files.upload()
Una vez cargado el fichero de audio, podemos escucharlo de la siguiente manera:
import IPython
wav_file_name = "audio_sample.wav"
print(wav_file_name)
IPython.display.Audio(wav_file_name)
A continuación vamos a definir ciertas funciones para poder hacer manejo de ficheros de audio en Python.
Comenzamos definiendo una función read_recording que leerá un fichero de audio WAV, normalizará la amplitud y devolverá el vector de muestras signal y su frecuencia de muestreo fs.
import scipy.io.wavfile
def read_recording(wav_file_name):
fs, signal = scipy.io.wavfile.read(wav_file_name)
signal = signal/max(abs(signal)) # normalizes amplitude
return fs, signal
Si ejecutamos la función anterior para el fichero de ejemplo, podemos ver la forma en la que se carga dicho fichero de audio en Python. Así, podemos obtener la frecuencia de muestreo y la longitud del fichero en número de muestras:
fs, signal = read_recording(wav_file_name)
print("Signal variable shape: " + str(signal.shape))
print("Sample rate: " + str(fs))
print("File length: " + str(len(signal)) + " samples")
PREGUNTAS:
También podemos representar la señal y ver su forma de onda. Para ello, definimos la función plot_signal como sigue:
import matplotlib.pyplot as plt
import numpy as np
def plot_signal(signal, fs, ylabel="", title=""):
dur = len(signal)/fs
step = 1./fs
t_axis = np.arange(0., dur, step)
plt.plot(t_axis, signal)
plt.xlim([0, dur])
plt.ylabel(ylabel)
plt.xlabel('Time (seconds)')
plt.title(title)
plt.grid(True)
Y utilizando la función anterior, obtenemos su representación (amplitud frente al tiempo):
plot_signal(signal, fs, "Amplitude", wav_file_name)
plt.show()
PREGUNTAS:
En esta práctica, vamos a desarrollar un detector de actividad de voz, que determinará qué segmentos de la señal de voz son realmente voz y cuáles silencio.
Por ello, vamos a ver dos ejemplos de etiquetas ground truth, que corresponden al fichero de audio de ejemplo.
Primero, descargamos de Moodle las etiquetas de voz/silencio que están en los ficheros audio_sample_labels_1.voz y audio_sample_labels_2.voz y las cargamos en Google Colab como en el caso anterior.
from google.colab import files
uploaded = files.upload()
Estas etiquetas están guardadas en ficheros de texto y podemos cargarlas en Python de la siguiente manera:
labels_file_name = 'audio_sample_labels_1.voz'
voice_labels = np.loadtxt(labels_file_name)
Con el siguiente código, podemos representar la señal de voz así como sus etiquetas en la misma figura:
plot_signal(signal, fs)
plot_signal(voice_labels*2-1, fs, "Amplitude", wav_file_name)
plt.show()
Las etiquetas de voz/silencio provienen de distintos detectores de actividad de voz.
labels_file_name = 'audio_sample_labels_2.voz'
voice_labels_2 = np.loadtxt(labels_file_name)
plot_signal(signal, fs)
plot_signal(voice_labels_2*2-1, fs, "Amplitude", wav_file_name)
plt.show()
print(f"% de voz en el primer etiquetado: {100*np.mean(voice_labels):.2f}%")
print(f"% de voz en el segundo etiquetado: {100*np.mean(voice_labels_2):.2f}%")
PREGUNTAS:
En la mayoría de sistemas de reconocimiento de patrones, un primer paso es la extracción de características. Esto consiste, a grandes rasgos, en obtener una representación de los datos de entrada, que serán utilizados para un posterior modelado.
En nuestro caso, vamos pasar de la señal en crudo "raw" dada por las muestras (signal), a una secuencia de vectores de características que extraigan información a corto plazo de la misma y la representen. Esta sería la entrada a nuestro sistema de detección de voz basado en redes neuronales.
Para ver algunos ejemplos, vamos a utilizar la librería librosa (https://librosa.org/doc/latest/index.html).
Dentro de esta librería, tenemos funciones para extraer distintos tipos de características de la señal de voz, como por ejemplo el espectrograma en escala Mel (melspectrogram).
Estas características a corto plazo, se extraen en ventanas de unos pocos milisegundos con o sin solapamiento.
Un ejemplo sería el siguiente:
import librosa
mel_spec = librosa.feature.melspectrogram(
signal, fs, n_mels=23, win_length=320, hop_length=160)
print(mel_spec.shape)
print(signal.shape)
from librosa.display import specshow
S_DB = librosa.power_to_db(mel_spec, ref=np.max)
specshow(S_DB, sr=fs, hop_length=160, x_axis='time', y_axis='mel');
plt.colorbar(format='%+2.0f dB');
PREGUNTAS:
De esta manera, podríamos obtener una parametrización de las señales para ser utilizadas como entrada a nuestra red neuronal.
Para los siguientes apartados, se proporcionan los vectores de características MFCC para una serie de audios que se utilizarán como conjunto de entrenamiento del modelo de VAD.
Primero vamos a descargar la lista de identificadores de los datos de entrenamiento de la práctica.
Para ello, necesitaremos descargar de Moodle el fichero training_VAD.lst, y ejecutar las siguientes líneas de código, que nos permitirán cargar el archivo a Google Colab desde el disco local:
from google.colab import drive
drive.mount("/content/drive", force_remount = True)
DIR = "/content/drive/My Drive/pit/"
%cd "$DIR"
A continuación cargamos los identificadores contenidos en el fichero en una lista en Python:
file_train_list = 'training_VAD.lst' # mat files containing data + labels
f = open(file_train_list, 'r')
train_list = f.read().splitlines()
f.close()
Podemos ver algunos de ellos (los primeros 10 identificatores) de la siguiente forma:
print(train_list[:10])
Ahora, descargaremos de Moodle el fichero data_download_onedrive_training_VAD.sh, y ejecutaremos las siguientes líneas de código, que nos permitirán cargar el archivo a Google Colab desde el disco local:
Para descargar el conjunto de datos desde Google drive, ejecutamos el script cargado anteriormente de la siguiente manera:
!chmod 755 data_download_onedrive_training_VAD.sh
!./data_download_onedrive_training_VAD.sh
Este script descargará los datos de Google Drive y los cargará en Google Colab, descomprimiéndolos en la carpeta data/training_VAD.
Podemos comprobar que los ficheros .mat se encuentran en el directorio esperado:
!ls data/training_VAD/ | head
Utilizando la librería Pytorch (https://pytorch.org/docs/stable/index.html), vamos a definir un modelo de ejemplo con una capa LSTM y una capa de salida. La capa de salida estará formada por una única neurona. La salida indicará la probabilidad de voz/silencio utilizando una función sigmoide.
import torch
import torch.nn as nn
import torch.nn.functional as F
class Model_1(nn.Module):
def __init__(self, feat_dim=20):
super(Model_1, self).__init__()
self.lstm = nn.LSTM(feat_dim,256,batch_first=True,bidirectional=False)
self.output = nn.Linear(256,1)
def forward(self, x):
out = self.lstm(x)[0]
out = self.output(out)
out = torch.sigmoid(out)
return out.squeeze(-1)
PREGUNTAS:
Una vez definida la clase, podemos crear nuestra instancia del modelo y cargarlo en la GPU con el siguiente código:
model = Model_1(feat_dim=20)
model = model.to(torch.device("cuda"))
print(model)
Nuestra variable model contiene el modelo, y ya estamos listos para entrenarlo y evaluarlo.
Como hemos visto anteriormente, nuestros datos están guardados en ficheros de Matlab (.mat). Cada uno de estos ficheros contiene una matriz X correspondiente a las secuencias de características MFCC (con sus derivadas de primer y segundo orden), y un vector Y con las etiquetas de voz/silencio correspondientes.
Veamos un ejemplo:
features_file = 'data/training_VAD/features_labs_1.mat'
import scipy.io
features = scipy.io.loadmat(features_file)['X']
labels = scipy.io.loadmat(features_file)['Y']
print(features.shape)
print(labels.shape)
PREGUNTAS: Elegir un fichero de entrenamiento y responder a las siguientes preguntas:
El entrenamiento del modelo se va a realizar mediante descenso por gradiente (o alguna de sus variantes) basado en batches.
Para preparar cada uno de estos batches que servirán de entrada a nuestro modelo LSTM, debemos almacenar las características en secuencias de la misma longitud. El siguiente código lee las características (get_fea) y sus correspondientes etiquetas (get_lab) de un fragmento aleatorio del fichero de entrada.
import scipy.io
import numpy as np
def get_fea(segment, rand_idx):
data = scipy.io.loadmat(segment)['X']
if data.shape[0] <= length_segments:
start_frame = 0
else:
start_frame = np.random.permutation(data.shape[0]-length_segments)[0]
end_frame = np.min((start_frame + length_segments,data.shape[0]))
rand_idx[segment] = start_frame
feat = data[start_frame:end_frame,:20] # discard D and DD, just 20 MFCCs
return feat[np.newaxis, :, :]
def get_lab(segment, rand_idx):
data = scipy.io.loadmat(segment)['Y']
start_frame = rand_idx[segment]
end_frame = np.min((start_frame + length_segments, data.shape[0]))
labs = data[start_frame:end_frame].flatten()
return labs[np.newaxis,:]
PREGUNTAS: Analizar las funciones anteriores detenidamente y responder a las siguientes cuestiones:
Una vez definidas las funciones de lectura de datos y preparación del formato que necesitamos para la entrada a la red LSTM, podemos utilizar el siguiente código para entrenarlo.
length_segments = 300
path_in_feat = 'data/training_VAD/'
from torch import optim
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
batch_size = 51
segment_sets = np.array_split(train_list, len(train_list)/batch_size)
accs, losses = [], []
max_iters = 5
for epoch in range(1, max_iters):
print('Epoch:', epoch)
model.train()
cache_loss = 0
y_pred, y_true = [], []
for ii, segment_set in enumerate(segment_sets):
rand_idx = {}
optimizer.zero_grad()
# Create training batches
train_batch = np.vstack([get_fea(path_in_feat + segment, rand_idx) for segment in segment_set])
labs_batch = np.vstack([get_lab(path_in_feat + segment, rand_idx).astype(np.int16) for segment in segment_set])
assert len(labs_batch) == len(train_batch) # make sure that all frames have defined label
# Shuffle the data and place them into Pytorch tensors
shuffle = np.random.permutation(len(labs_batch))
labs_batch = torch.tensor(labs_batch.take(shuffle, axis=0).astype("float32")).to(torch.device("cuda"))
train_batch = torch.tensor(train_batch.take(shuffle, axis=0).astype("float32")).to(torch.device("cuda"))
#print(f" Mini-batch {ii+1}")
# Forward the data through the network
outputs = model(train_batch)
# Compute cost
loss = criterion(outputs, labs_batch)
# Backward step
loss.backward()
optimizer.step()
cache_loss += loss.item()
# Save
preds = outputs.cpu().detach().numpy().flatten()
y_pred.extend([1 if p >= 0.5 else 0 for p in preds])
y_true.extend(labs_batch.cpu().numpy().flatten())
y_pred = np.array(y_pred, dtype=np.float32)
y_true = np.array(y_true, dtype=np.float32)
acc = np.mean(y_true == y_pred)
loss = cache_loss/(ii+1)
print(f" Loss: {loss:.5f}")
print(f" Accuracy: {acc:.3f}")
accs.append(acc)
losses.append(loss)
import matplotlib.pyplot as plt
epochs = np.arange(max_iters - 1) + 1
plt.plot(epochs, losses, "-o", label="Loss")
plt.plot(epochs, accs, "-o", label ="Accuracy")
plt.xticks(epochs)
plt.xlabel("Epochs")
plt.legend()
plt.show()
PREGUNTAS: Analizar el código anterior cuidadosamente y ejecutarlo. A continuación, responder a las siguientes cuestiones:
Una vez entrenado el modelo, vamos a evaluarlo en un ejemplo en concreto.
Descargue de Moodle el fichero audio_sample_test.wav, con sus correspondientes características y etiquetas audio_sample_test.mat y evalúe el rendimiento en el mismo.
from google.colab import files
uploaded = files.upload()
features_file = 'audio_sample_test.mat'
import scipy.io
features = scipy.io.loadmat(features_file)['X'][:, :20]
labels = scipy.io.loadmat(features_file)['Y'].reshape(-1).astype("float32")
print(features.shape)
print(labels.shape)
def get_predictions(features):
model.eval()
with torch.no_grad():
batch = features[np.newaxis, :, :]
batch = torch.tensor(batch.astype("float32")).to(torch.device("cuda"))
# predict probabilities of voice for each frame
# shape of input is (1, 57777, 20)
preds = model(batch)
return preds.cpu().numpy()[0]
# get predictions
probs = get_predictions(features)
# get the predicted labels from the model's predictions
pred_labels = np.array([1 if p >= 0.5 else 0 for p in probs])
print(pred_labels.shape)
print(labels.shape)
# Get accuracy
acc = np.mean(labels == pred_labels)
print(f'Accuracy: {acc:.3f}')
# save predictions
np.savez("preds_P1.npz", preds=pred_labels)
# load predictions
#with np.load('preds_P1.npz') as data:
# pred_labels = data['preds']
wav_file_name = "audio_sample_test.wav"
fs, signal = read_recording(wav_file_name)
print("Signal shape: " + str(signal.shape))
print("Sample rate: " + str(fs))
segment = 1
n_seconds = 10 # only 10 seconds
n_labels_segment = n_seconds*100 # one label every 10 ms
signal_segment = signal[segment*n_seconds*fs:(segment + 1)*n_seconds*fs]
labels_segment = labels[segment*n_labels_segment:(segment + 1)*n_labels_segment]
pred_labels_segment = pred_labels[segment*n_labels_segment:(segment + 1)*n_labels_segment]
print(f"Número de muestras de los {n_seconds} segundos de la señal:", len(signal_segment))
def plot_labels(labels, color='tab:orange'):
t = np.arange(n_seconds*segment, (segment + 1)*n_seconds, 1./fs)
labels_range = np.linspace(segment*n_seconds, (segment + 1)*n_seconds, num=n_labels_segment)
plt.plot(t, signal_segment)
plt.plot(labels_range, 2*labels - 1, color=color)
plt.title(wav_file_name)
plt.xlabel("Time (seconds)")
plt.ylabel("Amplitude")
plt.grid(True)
plt.xticks(np.arange(n_seconds*segment, (segment + 1)*n_seconds + 2, 2))
plt.xlim([t[0]-1e-3, t[-1]+1e-3])
plt.show()
plot_labels(labels_segment)
plot_labels(pred_labels_segment, color="green")
import IPython
print(wav_file_name)
IPython.display.Audio(wav_file_name)